Zookeeper 客户端的若干场景使用演示

客户端依赖说明

Curator 最初由 Netflix 研发并捐赠给 Apache,它的核心理念是 “Guava is to Java that Curator to ZooKeeper”。它不仅仅是一个连接工具,更是一套分布式治理的解决方案。


为什么弃用zk原生 API

对于zk原生的API,存在以下问题:

  • Watcher 是一次性的:原生 ZK 里的 Watcher 触发后就消失了。如果你想持续监听,得在回调里再注册。在高并发下,两次注册之间有 “空窗期”,数据变了你根本不知道。
  • 异常处理太折磨:网络抖动会报 ConnectionLoss,Session 超时会报 SessionExpired。原生 API 强制要求开发者手动处理这些状态切换,稍有不慎就会导致整个集群节点泄露。
  • 重复造轮子:像分布式锁、Master 选举这种功能,原生 API 都不提供,每个公司都要自己手写一套极其复杂的逻辑,且 90% 的手写实现都有死锁 Bug。

在 Curator 出现之前,zkClient 也曾经流行过一段时间。但现在它基本被边缘化了,原因很简单,zkClient 已经多年没有重大更新,无法支持 ZK 3.5+ 之后的新特性(如容器节点、TTL 节点)。


Curator 的四大绝招

  • 完美的重试机制:这是 Curator 的成名作,它内置了多种重试策略,比如 指数退避算法。当网络出现瞬时抖动时,Curator 不会立刻报错抛给业务层,而是会按照“1s, 2s, 4s, 8s…”的时间间隔自动尝试重新连接。这种丝滑的容错能力是生产环境高可用的基石。
  • 自动管理 Watcher:Curator 引入了 CuratorCache(旧版为 NodeCache/PathChildrenCache)。它会在本地维护一个 ZK 数据的镜像。无论 ZK 节点怎么变,Curator 会自动重新注册 Watcher 并更新本地缓存。业务层只需要监听本地缓存的事件,再也不用担心 Watcher 丢失问题。
  • 工业级 Recipes(分布式算法库):Curator 把最难写的分布式协作逻辑封装成了 Recipes,拿来即用。这些代码经过了 Netflix 等大厂多年高并发流量的 “毒打”,其稳定性远超个人手写版本。
    • 分布式锁:InterProcessMutex
    • Leader 选举:LeaderSelector
    • 分布式计数器:DistributedAtomicInteger
    • 分布式队列:DistributedQueue
  • 流式 API:相比于原生 API 混乱的参数列表,Curator 采用了类似链式调用的写法,代码可读性极高。


依赖项配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<curator.version>5.9.0</curator.version><!--支持 zk 3.5+ 以上版本的新特性-->
<guava.version>33.4.0-jre</guava.version>
<logback.version>1.5.25</logback.version>
<lombok.version>1.18.44</lombok.version>
</properties>

<dependencies>
<!--对zk底层API的一些封装-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${curator.version}</version>
<exclusions>
<!--ZooKeeper 的原生依赖项非常“重”,它通常会引入一大堆日志库(如 log4j, slf4j-log4j12)和网络库(如 netty)
实际开发中,我们的原则是先切断所有来源,再由开发者手动引入最干净、版本最准确的依赖,从而由我们手动来指定和控制zk版本-->
<exclusion>
<groupId>org.apache.ZooKeeper</groupId>
<artifactId>ZooKeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--封装了一些高级特性,如Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>${curator.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.ZooKeeper</groupId>
<artifactId>ZooKeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.9.3</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>-->

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.12.2</version>
<scope>test</scope>
</dependency>
</dependencies>


演示代码

创建Curator客户端实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* Curator 客户端工厂
*/
@Slf4j
public class CuratorClientFactory {

private static final int DEFAULT_BASE_SLEEP_TIME_MS = 1000;
private static final int DEFAULT_MAX_RETRIES = 3;

private static final int DEFAULT_SESSION_TIMEOUT_MS = 60000; // 建议 60s
private static final int DEFAULT_CONNECTION_TIMEOUT_MS = 15000; // 建议 15s

/**
* 方式一:快速创建简单客户端
*/
public static CuratorFramework createSimple(String connectionString) {
// 使用指数退避算法,防止网络风暴
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(DEFAULT_BASE_SLEEP_TIME_MS, DEFAULT_MAX_RETRIES);
return CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
}

/**
* 方式二:全面定制化创建(推荐生产使用)
* s@param connectionString 连接字符串
* @param namespace 命名空间(业务隔离的关键)
*/
public static CuratorFramework createWithOptions(String connectionString, String namespace) {
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(DEFAULT_BASE_SLEEP_TIME_MS, DEFAULT_MAX_RETRIES);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(connectionString)
.retryPolicy(retryPolicy)
.sessionTimeoutMs(DEFAULT_SESSION_TIMEOUT_MS)
.connectionTimeoutMs(DEFAULT_CONNECTION_TIMEOUT_MS)
// 1. 命名空间:所有的操作都会自动带上 /xxx 前缀,实现逻辑隔离
.namespace(namespace)
// 2. ACL 认证(如有需求,在此添加授权信息)
// .authorization("digest", "user:pass".getBytes())
.build();

// 3. 注册连接状态监听器,生产环境必须监控连接是否断开
client.getConnectionStateListenable().addListener((c, newState) -> {
log.info("Curator 状态变更: {}", newState);
if (newState == org.apache.curator.framework.state.ConnectionState.LOST) {
log.error("ZK 连接彻底丢失,可能导致服务下线,请检查网络!");
}
});

// 4. 自动启动(省去在业务代码中手动调用的麻烦)
if (client.getState() == CuratorFrameworkState.LATENT) {
client.start();
}
return client;
}
}


通过Curator创建节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private final String connectionString = "192.168.1.149:2181";

/**
* 创建节点
*/
@Test
public void createNode() {
// 客户端实例
CuratorFramework client = CuratorClientFactory.createSimple(connectionString);
try {
// 启动客户端实例,连接服务器
client.start();
// 创建一个 ZNode 节点
// 节点的数据为 payload
String data = "hello";
byte[] payload = data.getBytes(StandardCharsets.UTF_8);
String zkPath = "/test/CRUD/node-1";
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(zkPath, payload);
} catch (Exception e) {
e.printStackTrace();
} finally {
CloseableUtils.closeQuietly(client);
}
}


通过Curator读取节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 读取节点
* 无论是 checkExists()、getData() 还是getChildren() 方法都有一个共同的特点:
* 这些方法返回的是构造者实例,不会立即执行。
* 通过构造者实例的链式调用为自己增加具体的操作,在调用末端使用forPath(String znodePath)方法,在节点上去执行实际的操作。
*/
@Test
public void readNode() {
// 创建客户端
CuratorFramework client = CuratorClientFactory.createSimple(connectionString);
try {
// 启动客户端实例,连接服务器
client.start();
String zkPath = "/test/CRUD/node-1";
Stat stat = client.checkExists().forPath(zkPath);
if (null != stat) {
// 读取节点的数据
byte[] payload = client.getData().forPath(zkPath);
String data = new String(payload, StandardCharsets.UTF_8);
log.info("read data:{}", data); // hello
String parentPath = "/test";
List<String> children = client.getChildren().forPath(parentPath);
for (String child : children) {
log.info("child:{}", child); // CRUD
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
CloseableUtils.closeQuietly(client);
}
}


通过Curator更新节点

同步更新:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 同步更新节点
*/
@Test
public void updateNode() {
// 创建客户端
CuratorFramework client = CuratorClientFactory.createSimple(connectionString);
try {
// 启动客户端实例,连接服务器
client.start();
String data = "hello world";
byte[] payload = data.getBytes(StandardCharsets.UTF_8);
String zkPath = "/test/CRUD/node-1";
client.setData().forPath(zkPath, payload);
} catch (Exception e) {
e.printStackTrace();
} finally {
CloseableUtils.closeQuietly(client);
}
}

异步更新:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 更新节点 - 异步模式
*/
@Test
public void updateNodeAsync() {
CuratorFramework client = CuratorClientFactory.createSimple(connectionString);
try {
client.start();
String data = "hello, every body! ";
byte[] payload = data.getBytes(StandardCharsets.UTF_8);
String zkPath = "/test/CRUD/node-1";

// 确保节点存在(异步更新前必须有节点)
if (client.checkExists().forPath(zkPath) == null) {
client.create().creatingParentsIfNeeded().forPath(zkPath);
}

client.setData()
.inBackground((c, event) -> { // 绑定回调
// event 包含了执行的所有结果信息
int resultCode = event.getResultCode();
String path = event.getPath();
Object ctx = event.getContext();
log.info("Callback - resultCode:{} | Path:{} | Context:{} | Type:{}", resultCode, path, ctx, event.getType());
}, "MyContextData") // 可以传递上下文
.forPath(zkPath, payload);
Thread.sleep(10000);
} catch (Exception e) {
e.printStackTrace();
} finally {
CloseableUtils.closeQuietly(client);
}
}


通过Curator删除节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Test
public void deleteNode() { // 注:删除也可以和更新一样使用异步的方式
CuratorFramework client = CuratorClientFactory.createSimple(connectionString);
try {
client.start();
String zkPath = "/test/CRUD/node-1";
client.delete().forPath(zkPath);
String parentPath = "/test/CRUD";
List<String> children = client.getChildren().forPath(parentPath);
for (String child : children) {
log.info("child is:{}", child);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
CloseableUtils.closeQuietly(client);
}
}


分布式事件监听

一次性注册方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 注册一个Watcher监听器实例,随后调用setData方法改变该节点内容,虽然改变了两次,但是监听器仅仅监听到了一个事件。
* 换句话说,监听器是注册的,是一次性的,当第二次改变节点内容时,注册已经失效,无法再次捕获节点变动事件。
*
* 如果要反复使用,则需要反复通过构造者的usingWatcher方法去提前进行注册。所以,Watcher监听器不适用于节点的数据
* 频繁变动或者节点频繁变动这样的业务场景,而是适用于一些特殊的、变动不频繁的场景,比如会话超时、授权失败等这样的特殊场景。
*/
@Test
public void testWatcher() {
CuratorFramework client = CuratorClientFactory.createSimple(connectionString);
try {
client.start();
String zkPath = "/test/CRUD/node-1";
if (client.checkExists().forPath(zkPath) == null) {
client.create().creatingParentsIfNeeded().forPath(zkPath);
}

byte[] content = client.getData()
.usingWatcher((Watcher) watchedEvent -> System.out.println("监听到的变化watchedEvent=" + watchedEvent))
.forPath(zkPath);
log.info("监听节点内容:{}", new String(content));

// 第一次变更节点数据
client.setData().forPath(zkPath, "第一次更改内容".getBytes());
// 第二次变更节点数据
client.setData().forPath(zkPath, "第二次更改内容".getBytes());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10));
} catch (Exception e) {
e.printStackTrace();
}
}

持久监听:旧版 NodeCache 写法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
/**
* Curator引入的Cache缓存拥有一系列的类型,包括 NodeCache、PathCache、TreeCache。
*
* NodeCache用来观察ZNode自身,如果ZooKeeper上的ZNode节点被创建,更新或者删除,
* 那么NodeCache会更新缓存,并触发事件给注册的监听器。
*/
@Test
public void testNodeCache() {
CuratorFramework client = CuratorClientFactory.createSimple(connectionString);
String workerPath = "/test/CRUD/node-1";
try {
client.start();
if (client.checkExists().forPath(workerPath) == null) {
client.create().creatingParentsIfNeeded().forPath(workerPath);
}

NodeCache nodeCache = new NodeCache(client, workerPath, false);
// 启动节点的事件监听
nodeCache.getListenable().addListener(() -> {
ChildData childData = nodeCache.getCurrentData();
log.info("ZNode节点状态改变, path={}", childData.getPath());
log.info("ZNode节点状态改变, data={}", new String(childData.getData(), StandardCharsets.UTF_8));
log.info("ZNode节点状态改变, stat={}", childData.getStat());
});
nodeCache.start();

// 第一次变更节点数据
client.setData().forPath(workerPath, "第一次更改内容".getBytes());
Thread.sleep(1000);

// 第二次变更节点数据
client.setData().forPath(workerPath, "第二次更改内容".getBytes());
Thread.sleep(1000);

// 第三次变更节点数据
client.setData().forPath(workerPath, "第三次更改内容".getBytes());
Thread.sleep(1000);
} catch (Exception e) {
log.error("创建NodeCache监听失败, path={}", workerPath);
}
}

/**
* PathCache子节点缓存用来观察ZNode的子节点、缓存子节点的状态,如果ZNode的某个
* 子节点被创建、更新或者删除,那么PathCache会更新缓存,并且触发事件给注册的监听器。
*/
@Test
public void testPathChildrenCache() {
CuratorFramework client = CuratorClientFactory.createSimple(connectionString);
String zkPath = "/test/CRUD";
try {
client.start();
if (client.checkExists().forPath(zkPath) == null) {
client.create().creatingParentsIfNeeded().forPath(zkPath);
}

PathChildrenCache cache = new PathChildrenCache(client, zkPath, true);
// 增加监听器
cache.getListenable().addListener((c, event) -> {
log.info("Callback - event:{}", event);
});
// 设置启动模式
cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
Thread.sleep(1000);

// 创建三个子节点
for (int i = 2; i < 5; i++) {
client.create()
.withMode(CreateMode.PERSISTENT)
.forPath(zkPath + "/node-" + i, ("数据"+i).getBytes() );
}
Thread.sleep(1000);

// 删除三个子节点
for (int i = 2; i < 5; i++) {
client.delete().forPath(zkPath + "/node-" + i);
}
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10));
} catch (Exception e) {
log.error("PathCache监听失败, path={}", zkPath, e);
}
}

/**
* TreeCache既能监听子节点,也能监听节点自身
*/
@Test
public void testTreeCache() {
CuratorFramework client = CuratorClientFactory.createSimple(connectionString);
String workerPath = "/test/CRUD";
try {
client.start();
if (client.checkExists().forPath(workerPath) == null) {
client.create().creatingParentsIfNeeded().forPath(workerPath);
}

TreeCache treeCache = new TreeCache(client, workerPath);
// 设置监听器
treeCache.getListenable().addListener((c, event) -> {
ChildData data = event.getData();
if (data == null) {
log.info("数据为空:{}", event);
return;
}
switch (event.getType()) {
case NODE_ADDED:
log.info("[TreeCache]节点增加, path={}, data={}", data.getPath(), new String(data.getData(), StandardCharsets.UTF_8));
break;
case NODE_UPDATED:
log.info("[TreeCache]节点更新, path={}, data={}", data.getPath(), new String(data.getData(), StandardCharsets.UTF_8));
break;
case NODE_REMOVED:
log.info("[TreeCache]节点删除, path={}, data={}", data.getPath(), new String(data.getData(), StandardCharsets.UTF_8));
break;
default:
break;
}
});

// 启动缓存视图
treeCache.start();
Thread.sleep(1000);

// 创建三个子节点
for (int i = 2; i < 5; i++) {
client.create()
.withMode(CreateMode.PERSISTENT)
.forPath(workerPath + "/node-" + i, ("数据"+i).getBytes() );
}
Thread.sleep(1000);

// 删除三个子节点
for (int i = 2; i < 5; i++) {
client.delete().forPath(workerPath + "/node-" + i);
}
Thread.sleep(1000);

// 删除当前节点
client.delete().forPath(workerPath);
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10));
} catch (Exception e) {
log.error("TreeCache监听失败, path={}", workerPath, e);
}
}

持久监听:新版 CuratorCache 的写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
/**
* 替代 NodeCache(仅监听节点自身)
*/
@Test
public void testNodeCacheReplacement() {
CuratorFramework client = CuratorClientFactory.createSimple(connectionString);
String workerPath = "/test/CRUD/node-1";
try {
client.start();
if (client.checkExists().forPath(workerPath) == null) {
client.create().creatingParentsIfNeeded().forPath(workerPath);
}

// 新版写法:建立缓存
try (CuratorCache cache = CuratorCache.build(client, workerPath)) {
CuratorCacheListener listener = CuratorCacheListener.builder()
// 仅处理本路径的创建和变更事件
.forCreatesAndChanges((oldNode, newNode) -> {
if (newNode.getPath().equals(workerPath)) {
log.info("ZNode节点状态改变, path={}", newNode.getPath());
log.info("data={}", new String(newNode.getData()));
}
})
.build();

cache.listenable().addListener(listener);
cache.start();

// 模拟数据变更
client.setData().forPath(workerPath, "第一次更改内容".getBytes());
Thread.sleep(1000);

client.setData().forPath(workerPath, "第二次更改内容".getBytes());
Thread.sleep(1000);
}
} catch (Exception e) {
log.error("CuratorCache(Node) 监听失败", e);
}
}

/**
* 替代 PathChildrenCache(监听子节点)
*/
@Test
public void testPathChildrenCacheReplacement() {
CuratorFramework client = CuratorClientFactory.createSimple(connectionString);
String zkPath = "/test/CRUD";
try {
client.start();
if (client.checkExists().forPath(zkPath) == null) {
client.create().creatingParentsIfNeeded().forPath(zkPath);
}

// 使用 try-with-resources 自动管理资源释放
try (CuratorCache cache = CuratorCache.build(client, zkPath)) {
CuratorCacheListener listener = CuratorCacheListener.builder()
// 监听所有事件,包括节点增删改
.forAll((type, oldData, newData) -> {
// 排除父节点自身的事件,只关注子节点
if (newData != null && newData.getPath().equals(zkPath)) return;
if (oldData != null && oldData.getPath().equals(zkPath)) return;

log.info("子节点事件触发 - Type: {}, Path: {}", type,
(newData != null ? newData.getPath() : oldData.getPath()));
})
.build();

cache.listenable().addListener(listener);
cache.start();

// 创建子节点
for (int i = 2; i < 5; i++) {
client.create().withMode(CreateMode.PERSISTENT)
.forPath(zkPath + "/node-" + i, ("数据" + i).getBytes());
}

Thread.sleep(1000);
// 删除子节点
for (int i = 2; i < 5; i++) {
client.delete().forPath(zkPath + "/node-" + i);
}
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
}
} catch (Exception e) {
log.error("CuratorCache(Path) 监听失败", e);
}
}

/**
* 替代 TreeCache(递归监听全路径)
*/
@Test
public void testTreeCacheReplacement() {
CuratorFramework client = CuratorClientFactory.createSimple(connectionString);
String workerPath = "/test/CRUD";
try {
client.start();
if (client.checkExists().forPath(workerPath) == null) {
client.create().creatingParentsIfNeeded().forPath(workerPath);
}

// TreeCache 效果:默认就是递归监听 workerPath 及其下所有子节点
try (CuratorCache cache = CuratorCache.build(client, workerPath)) {
CuratorCacheListener listener = CuratorCacheListener.builder()
.forCreates(node -> log.info("[Tree]节点增加: {}", node.getPath()))
.forChanges((oldNode, newNode) -> log.info("[Tree]节点更新: {}", newNode.getPath()))
.forDeletes(node -> log.info("[Tree]节点删除: {}", node.getPath()))
.build();

cache.listenable().addListener(listener);
cache.start();

// 业务操作...
client.create().forPath(workerPath + "/node-sub", "sub-data".getBytes());
Thread.sleep(500);
client.setData().forPath(workerPath + "/node-sub", "new-sub-data".getBytes());
Thread.sleep(500);
client.delete().forPath(workerPath + "/node-sub");

LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
}
} catch (Exception e) {
log.error("CuratorCache(Tree) 监听失败", e);
}
}


ID生成器

基础的ID生成器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.zookeeper.CreateMode;

/**
* 最基础的 zk ID 生成器
*/
@Slf4j
public class IDMaker {
private final CuratorFramework client;
private final String nodePath;

public IDMaker(CuratorFramework client, String nodePath) {
this.client = client;
this.nodePath = nodePath;
}

public String makeId() {
try {
// 1. 创建临时顺序节点
String fullPath = client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(nodePath);

// 2. 提取 ID(利用 ZK 节点固定长度 10 位的特性)
String id = fullPath.substring(fullPath.length() - 10);

// 3. 异步删除节点,减少 ZK 存储压力(Background 不阻塞 ID 返回)
client.delete().guaranteed().inBackground().forPath(fullPath);
return id;
} catch (Exception e) {
log.error("生成分布式ID失败", e);
throw new RuntimeException("ID generation failed", e);
}
}

public static void main(String[] args) {
CuratorFramework client = CuratorClientFactory.createSimple("192.168.1.149:2181");
if (client.getState() == CuratorFrameworkState.LATENT) { // 启动客户端实例,连接服务器
client.start();
}
String nodePath = "/test/IDMaker/ID-";
IDMaker idMaker = new IDMaker(client, nodePath);
for (int i = 0; i < 10; i++) {
String id = idMaker.makeId();
log.info("第 {} 个创建的id为: {}", i, id);
}
}
}

基于号段的 zk ID 生成器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.atomic.AtomicLong;

/**
* @author KJ
* @description 号段 zk ID 生成器
*/
@Slf4j
public class IDMaker2 {

private final CuratorFramework client;
private final String zkPath;
private final AtomicLong currentId = new AtomicLong();
private long maxId; // 当前号段的最大值
private final int step = 10; // 每次领取的号段跨度

public IDMaker2(CuratorFramework client, String zkPath) {
this.client = client;
this.zkPath = zkPath;
}

/**
* 核心逻辑:本地原子递增,用完才去 ZK 更新
*/
public synchronized long nextId() {
if (currentId.get() >= maxId) {
getNextSegment();
}
return currentId.incrementAndGet();
}

private void getNextSegment() {
try {
// 1. 检查节点是否存在(重要)
if (client.checkExists().forPath(zkPath) == null) {
client.create().creatingParentsIfNeeded().forPath(zkPath, "0".getBytes());
}

// 2. 使用乐观锁(dataVersion)确保多节点争抢号段安全
Stat stat = new Stat();
byte[] data = client.getData().storingStatIn(stat).forPath(zkPath);
long remoteValue = Long.parseLong(new String(data));
long nextMaxId = remoteValue + step;

// 3. 更新 ZK 上的最新值
client.setData()
.withVersion(stat.getVersion())
.forPath(zkPath, String.valueOf(nextMaxId).getBytes());

// 4. 更新本地内存缓冲区
this.maxId = nextMaxId;
this.currentId.set(remoteValue);
log.info("成功领取号段: {} - {}", remoteValue + 1, nextMaxId);
} catch (KeeperException.BadVersionException e) {
// 如果版本冲突,重试即可
getNextSegment();
} catch (Exception e) {
throw new RuntimeException("领取号段失败", e);
}
}

public static void main(String[] args) {
CuratorFramework client = CuratorClientFactory.createSimple("192.168.1.149:2181");
client.start();

String nodePath = "/test/IDMaker/ID-";
IDMaker2 idMaker = new IDMaker2(client, nodePath);
for (int i = 0; i < 30; i++) {
long id = idMaker.nextId();
log.info("第 {} 个创建的id为: {}", i, id);
}
}
}


集群节点的命名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* 模拟基于 ZK 的雪花算法 WorkerID 分配器
*/
@Slf4j
public class SnowflakeIdWorker {

private final CuratorFramework zkClient;
private final String pathPrefix = "/owlias/worker-";

private String pathRegistered = null;
private Long workerId = null;
private final AtomicBoolean isInit = new AtomicBoolean(false); // 确保初始化只执行一次

// 通过构造方法注入已经 STARTED 的 Client
public SnowflakeIdWorker(CuratorFramework zkClient) {
this.zkClient = zkClient;
this.init();
}

/**
* 在 ZK 中注册临时顺序节点,获取唯一的 WorkerID
*/
public synchronized void init() {
if (isInit.get()) return;

try {
// 1. 创建临时顺序节点(节点 Data 可以存入本机的 IP,方便运维排查)
String localIp = java.net.InetAddress.getLocalHost().getHostAddress();
pathRegistered = zkClient.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(pathPrefix, localIp.getBytes());

// 2. 提取顺序号(ZK 生成的路径类似: /owlias/worker-0000000001)
String sid = pathRegistered.substring(pathRegistered.lastIndexOf("-") + 1);
long rawId = Long.parseLong(sid);

// 3. 核心限制:雪花算法 workerId 通常只有 10 bit (0-1023),如果 ZK 序号超过了 1023,通过取模保证其落在安全区间
this.workerId = rawId % 1024;

isInit.set(true);
log.info("Snowflake Worker 注册成功! 原生序号: {}, 分配 WorkerID: {}, 路径: {}",
rawId, this.workerId, pathRegistered);
} catch (Exception e) {
log.error("Snowflake Worker 注册失败", e);
throw new RuntimeException("ZK 注册失败,无法启动雪花算法", e);
}
}

/**
* 获取分配到的 WorkerID
*/
public long getWorkerId() {
if (!isInit.get()) {
throw new IllegalStateException("Worker 尚未初始化");
}
return workerId;
}

/**
* 应用关闭时主动销毁临时节点(虽然 ZK 会自动处理,但主动关闭更优雅)
*/
public void destroy() {
if (pathRegistered != null) {
try {
zkClient.delete().guaranteed().forPath(pathRegistered);
log.info("已注销 Snowflake Worker 节点: {}", pathRegistered);
} catch (Exception e) {
log.warn("注销 Worker 节点失败: {}", e.getMessage());
}
}
}

public static void main(String[] args) {
CuratorFramework client = CuratorClientFactory.createSimple("192.168.1.149:2181");
client.start();

SnowflakeIdWorker worker = new SnowflakeIdWorker(client);
worker.init();
long workerIdResult = worker.getWorkerId();
System.out.println(workerIdResult);
}
}


ZK分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* 生产级 ZooKeeper 分布式锁
* 支持严格重入、规避惊群效应、增强网络容错、完善节点清理
*/
@Slf4j
public class ZkLock {
private static final String ZK_PATH = "/owlias/locks";
private static final String LOCK_PREFIX = ZK_PATH + "/lock_";
private final CuratorFramework client;

// 使用 ThreadLocal 确保每个线程拥有独立的锁状态,解决多线程共用一个 ZkLock 对象的竞争问题
private final ThreadLocal<LockData> threadData = new ThreadLocal<>();
private static class LockData { // 内部类:保存每个线程的加锁上下文
String fullPath;
final AtomicInteger lockCount = new AtomicInteger(1);

LockData(String fullPath) {
this.fullPath = fullPath;
}
}

public ZkLock(CuratorFramework client) {
this.client = client;
try {
// 确保父节点存在
if (client.checkExists().forPath(ZK_PATH) == null) {
client.create().creatingParentsIfNeeded().forPath(ZK_PATH);
}
} catch (Exception e) {
log.error("初始化锁父节点失败", e);
}
}

/**
* 阻塞式加锁
*/
public boolean lock() {
// 1. 处理可重入逻辑
LockData data = threadData.get();
if (data != null) {
data.lockCount.incrementAndGet();
return true;
}

// 2. 尝试抢锁
String createdPath = null;
try {
// 创建临时顺序节点
createdPath = client.create()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(LOCK_PREFIX);

// 循环尝试直到获取成功或异常
while (true) {
List<String> waiters = getSortedWaiters();
String shortPath = createdPath.substring(ZK_PATH.length() + 1);
int index = waiters.indexOf(shortPath);

if (index < 0) {
throw new KeeperException.NoNodeException("节点意外消失: " + createdPath);
}

if (index == 0) {
// 成功获取锁:自己是序列中最小的节点
log.debug("线程 {} 获取锁成功", Thread.currentThread().getName());
threadData.set(new LockData(createdPath));
return true;
}

// 未获取锁:监听前驱节点 (index - 1)
String priorPath = ZK_PATH + "/" + waiters.get(index - 1);
waitForPriorNode(priorPath);
}
} catch (Exception e) {
log.error("加锁过程发生异常", e);
// 异常时必须清理已创建的节点,防止死锁
if (createdPath != null) {
deleteNodeQuietly(createdPath);
}
return false;
}
}

/**
* 释放锁
*/
public void unlock() {
LockData data = threadData.get();
if (data == null) {
throw new IllegalMonitorStateException("当前线程未持有锁");
}

if (data.lockCount.decrementAndGet() == 0) {
try {
threadData.remove();
deleteNodeQuietly(data.fullPath);
log.debug("线程 {} 释放锁成功", Thread.currentThread().getName());
} catch (Exception e) {
log.error("释放锁删除节点失败", e);
}
}
}

/**
* 等待前驱节点删除(使用同步信号量优化)
*/
private void waitForPriorNode(String priorPath) throws Exception {
final CountDownLatch latch = new CountDownLatch(1);

// 使用 CuratorCache 监听,相比原生 Watcher 更稳健,能自动处理重连
CuratorCache cache = CuratorCache.build(client, priorPath);
CuratorCacheListener listener = CuratorCacheListener.builder()
.forDeletes(oldNode -> latch.countDown())
.build();

cache.listenable().addListener(listener);
cache.start();

try {
// 再次检查前驱节点是否存在,防止在创建监听前节点就已删除(时序竞赛)
if (client.checkExists().forPath(priorPath) == null) {
return;
}
// 阻塞等待,建议增加超时机制防止永久挂起
if (!latch.await(30, TimeUnit.SECONDS)) {
log.warn("等待锁超时,前驱节点: {}", priorPath);
}
} finally {
cache.close(); // 释放资源
}
}

private List<String> getSortedWaiters() throws Exception {
List<String> children = client.getChildren().forPath(ZK_PATH);
Collections.sort(children);
return children;
}

private void deleteNodeQuietly(String path) {
try {
client.delete().guaranteed().forPath(path);
} catch (Exception ignored) {
// 静默处理,guaranteed() 会确保连接恢复后最终删除
}
}

/*public static void main(String[] args) throws Exception {
// 虽然手写锁有助于理解原理,但在真实生产中,强烈建议直接使用 curator 的 InterProcessMutex,它的内部处理了极其复杂的边界情况(如 JVM 假死、ZAB 协议切主等),比任何手写的锁都要健壮。
CuratorFramework client = CuratorClientFactory.createSimple("192.168.1.149:2181");
InterProcessMutex lock = new InterProcessMutex(client, "/owlias/main_lock");
if (lock.acquire(10, TimeUnit.SECONDS)) {
try {
System.out.println("业务处理..");
} finally {
lock.release();
}
}
}*/
}

ZooKeeper分布式锁的优缺点:

  • 优点:ZooKeeper分布式锁(如 curator 的 InterProcessMutex)能有效地解决分布式问题、不可重入问题,使用起来较为简单。
  • 缺点:ZooKeeper 实现的分布式锁性能不太高。因为每次在创建 锁和释放锁的过程中都要动态创建、销毁瞬时节点。在 ZooKeeper中,创建和删除节点只能通过Leader服务器来执行, 然后Leader服务器还需要将数据同步到所有的Follower机器上,这样频繁的网络通信,性能的短板是非常突出的。

总之,在高性能、高并发的场景下,不建议使用ZooKeeper的分布 式锁。由于ZooKeeper具有高可用特性,因此在并发量不是太高的场景 推荐使用ZooKeeper的分布式锁。在目前分布式锁的实现方案中,比较成熟、主流的方案有两种:

  • 基于ZooKeeper的分布式锁,适用于高可靠(高可用)而并发量不是太大的场景;
  • 基于Redis的分布式锁,适用于并发量很大、性能要求很 高、可靠性问题可以通过其他方案去弥补的场景。